package com.xnx3.obs.sink;

import com.xnx3.obs.config.HoodieWriterConfig;
import com.xnx3.obs.util.SparkUtil;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * Hudi write to OBS
 * @author aly
 */
public class HoodieObsSink {

    private static final Logger LOGGER = LoggerFactory.getLogger(HoodieObsSink.class);

    /**
     * Write to Hudi，the save mode is overwrite
     * @param spark Spark session
     * @param config Hudi run configuration
     * @param datas Data set to be written out
     */
    public void overwriteToObs(SparkSession spark, HoodieWriterConfig config, List<String> datas) {
        // Set save mode to overwrite
        config.saveMode(SaveMode.Overwrite);
        writeHudi(spark, config, datas);
    }

    /**
     * Write to Hudi，the save mode is append
     * @param spark Spark session
     * @param config Hudi run configuration
     * @param datas Data set to be written out
     */
    public void appendToObs(SparkSession spark, HoodieWriterConfig config, List<String> datas) {
        // Set save mode to append
        config.saveMode(SaveMode.Append);
        writeHudi(spark, config, datas);
    }

    /**
     * Write to Hudi
     * @param spark Spark session
     * @param config Hudi run configuration
     * @param datas Data set to be written out
     */
    public void writeHudi(SparkSession spark, HoodieWriterConfig config, List<String> datas) {
        JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
        // Process data, create dataset
        Dataset<Row> df = spark.read().json(javaSparkContext.parallelize(datas));
        // Write
        writeHudi(df, config);
    }

    /**
     * Write to Hudi
     * @param df Spark dataset
     * @param config Hudi run configuration
     */
    public void writeHudi(Dataset<Row> df, HoodieWriterConfig config) {
        // Write to OBS by configuration
        df.write().format("hudi")
                .options(SparkUtil.getDefaultWriteConfigs())
                .options(config.options())
                .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), config.recordkeyFieldOptKey())
                .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), config.precombineFieldOptKey())
                .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), config.keyGenerator())
                .option(HoodieWriteConfig.TABLE_NAME, config.tableName())
                .mode(config.saveMode())
                .save(config.basePath());
    }

}
